package com.nx.platform.es.mq.producer;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendCallback;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;

/**
 * MQ消息生产者（工具类）
 */
public class MQProducer {
    private static Logger logger = LoggerFactory.getLogger(MQProducer.class);
    private static volatile MQProducer instance;
    private DefaultMQProducer producer;

    private MQProducer() {
        producer = new DefaultMQProducer();
        try {
            //producer.init(MQTopics.PRODUCER_CONFIG);
            producer.start();
        } catch ( MQClientException e) {
            logger.error("new MQProducer error.{}", e.getMessage());
        }
    }

    public static MQProducer getInstance() {
        if (instance == null) {
            synchronized (MQProducer.class) {
                if (instance == null) {
                    instance = new MQProducer();
                }
            }
        }
        return instance;
    }

    /**
     * 发送MQ消息
     * 
     * @param msg
     * @param topic
     * @param tag
     * @param logStr
     * @return
     */
    public boolean sendMessage(String msg, String topic, String tag, String logStr) {
//        ZAssert.notEmpty(msg, ErrorStatus.PARAM_EMPTY);
//        ZAssert.notEmpty(topic, ErrorStatus.PARAM_EMPTY);
//        ZAssert.notEmpty(tag, ErrorStatus.PARAM_EMPTY);
        byte[] bytes = new byte[0];
        try {
            bytes = msg.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            logger.error("{} desc=sendMessage, ex={}", logStr, e);
        }
        Message message = new Message(topic, tag, logStr, bytes);
        return sendMessage(message, logStr);
    }

    /**
     * 普通发消息
     * 
     * @param message
     * @param logStr
     * @return
     */
    public Boolean sendMessage(Message message, String logStr) {
        logger.info("{} desc=sendMessage, message={}", logStr, message.toString());
        try {
            SendResult sendResult = producer.send(message);
            logger.info("{} desc=sendMessage sendResult={}", logStr, sendResult.toString());
            return sendResult.getSendStatus().equals(SendStatus.SEND_OK);
        } catch (MQClientException | RemotingException | MQBrokerException ex) {
            logger.error("{} desc=sendMessage, ex={}", logStr, ex);
        } catch (InterruptedException ex1) {
            logger.error("{} desc=asyncSendMessage, ex={}", logStr, ex1);
            Thread.currentThread().interrupt();
        }
        return false;
    }

    /**
     * 发送异步消息
     *
     * @param msg
     * @param topic
     * @param tag
     * @param callback
     * @param logStr
     */
    public void asyncSendMessage(String msg, String topic, String tag, SendCallback callback, String logStr) {
//        ZAssert.notEmpty(msg, ErrorStatus.PARAM_EMPTY);
//        ZAssert.notEmpty(topic, ErrorStatus.PARAM_EMPTY);
//        ZAssert.notEmpty(tag, ErrorStatus.PARAM_EMPTY);
        byte[] bytes = new byte[0];
        try {
            bytes = msg.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            logger.error("{} desc=sendMessage, ex={}", logStr, e);
        }
        Message message = new Message(topic, tag, logStr, bytes);
        asyncSendMessage(message, callback, logStr);
    }

    /**
     * 发送回调消息
     * 
     * @param message
     * @param sendCallback
     * @param logStr
     */
    public void asyncSendMessage(Message message, SendCallback sendCallback, String logStr) {
        logger.info("{} desc=asyncSendMessage, message={}", logStr, message.toString());
        try {
            producer.send(message, sendCallback);
        } catch (MQClientException | RemotingException ex) {
            logger.error("{} desc=asyncSendMessage, ex={}", logStr, ex);
        } catch (InterruptedException ex1) {
            logger.error("{} desc=asyncSendMessage, ex={}", logStr, ex1);
            Thread.currentThread().interrupt();
        }
    }

}
